use super::{
    current_worker, Attr, Config, GroupSender, MemCache, Task, TaskRef, Thread,
    ThreadProxy, Worker, TaskQueue,
};
use crate::channel::mpsc;
use crate::thread::{self, Mutex};
use crate::Error;
use core::cell::UnsafeCell;
use core::future::Future;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicBool, Ordering};
use hipool::{Boxed, BoxedMemPool, MemPool, NullAlloc, PoolAlloc};

type Pool = &'static MemPool;

pub(crate) type MpscSender = mpsc::Sender<'static, Signo, Pool>;
pub(crate) type MpscReceiver = mpsc::Receiver<'static, Signo, Pool>;

#[allow(non_camel_case_types)]
pub enum Signo {
    SIG_STOP,
    SIG_CLEAR_CACHE,
    SIG_NAME(&'static str),
}

static GROUPS: GroupArray = GroupArray::new();
const NON_GROUP: Option<BoxedGroup> = None;

struct GroupArray {
    groups: UnsafeCell<[Option<BoxedGroup>; 256]>,
    lock: Mutex<()>,
}

unsafe impl Sync for GroupArray {}

impl GroupArray {
    const fn new() -> Self {
        Self {
            groups: UnsafeCell::new([NON_GROUP; 256]),
            lock: Mutex::new(()),
        }
    }

    fn locked_get(&self, id: u8) -> Option<&Group> {
        let _ = self.lock.lock();
        self.get(id)
    }

    fn get(&self, id: u8) -> Option<&Group> {
        let groups = unsafe { &*self.groups.get() };
        groups[id as usize].as_deref()
    }

    fn active(&self, grp: BoxedGroup) -> Result<(), Error> {
        let id = grp.id() as usize;
        let _ = self.lock.lock();
        let groups = unsafe { &mut *self.groups.get() };
        if groups[id].is_some() {
            return Err(Error::new(libc::EEXIST));
        }
        groups[id].replace(grp);
        Ok(())
    }

    unsafe fn reactive(&self, grp: BoxedGroup) -> Result<(), Error> {
        if let Some(worker) = current_worker() {
            if worker.group_id() == grp.id() {
                return Err(Error::new(libc::EPERM));
            }
        }

        let id = grp.id() as usize;
        let _ = self.lock.lock();
        let groups = unsafe { &mut *self.groups.get() };
        if let Some(ref mut old_grp) = groups[id] {
            old_grp.stop();
        }
        groups[id].replace(grp);
        Ok(())
    }
}

#[repr(C)]
pub(crate) struct WorkerProxy {
    id: usize,
    mpscs: NonNull<[MpscSender]>,
    stopped: AtomicBool,
    thread: ThreadProxy,
    cache: &'static MemCache,
}

unsafe impl Sync for WorkerProxy {}

type BoxedWorkers = Boxed<'static, [Option<WorkerProxy>], Pool>;

impl WorkerProxy {
    fn stop(&mut self) {
        if !self.stopped.swap(true, Ordering::Relaxed) {
            self.signal(Signo::SIG_STOP);
            self.thread.join();
        }
    }
    fn signal(&self, sig: Signo) {
        let mpscs = unsafe { self.mpscs.as_ref() };
        mpscs[self.id].send(sig);
    }
    fn try_signal(&self, sig: Signo) -> Result<(), Signo> {
        let mpscs = unsafe { self.mpscs.as_ref() };
        mpscs[self.id].try_send(sig)
    }
}

impl Drop for WorkerProxy {
    fn drop(&mut self) {
        self.stop();
    }
}

type BoxedQueue = Boxed<'static, [TaskQueue], Pool>;
type BoxedMpsc = Boxed<'static, [MpscSender], Pool>;

#[allow(dead_code)]
#[repr(C)]
pub struct Group {
    sender: GroupSender,
    queues: BoxedQueue,
    mpscs: BoxedMpsc,
    workers: BoxedWorkers,
    conf: Config,
    pool: BoxedMemPool<PoolAlloc>,
}

unsafe impl Sync for Group {}

type BoxedGroup = Boxed<'static, Group, NullAlloc>;
type GroupRef = &'static Group;

impl Group {
    pub(crate) fn new(conf: &Config) -> Result<BoxedGroup, Error> {
        let conf = Self::conf(conf);
        let pool = MemPool::new_boxed(0)?.leak();
        let boxed_pool = unsafe { MemPool::from_raw(pool) };
        let nth = conf.nth;

        let queues = Boxed::init_slice_then_in(&*pool, nth, |_, uninit| {
            uninit.write(TaskQueue::new());
            Ok(())
        })?;
        let mut channels = Channels::new(&*pool, nth, conf.qlen)?;
        let mpscs = Boxed::init_slice_then_in(&*pool, nth, |n, uninit| {
            uninit.write(channels.mpsc_send(n));
            Ok(())
        })?;
        let mut workers = Boxed::init_slice_then_in(&*pool, nth, |_, uninit| {
            uninit.write(None);
            Ok(())
        })?;

        for n in 0..nth {
            let worker = Worker::new_in(
                pool,
                conf.id,
                n as u16,
                channels.mpsc_recv(n),
                NonNull::from(queues.as_ref()),
            )?;
            let cache = unsafe { &*(worker.cache() as *const _) };
            let thread = Thread::new_in(&*pool, worker)?;
            let proxy = WorkerProxy {
                id: n,
                mpscs: NonNull::from(mpscs.as_ref()),
                thread,
                cache,
                stopped: AtomicBool::new(false),
            };
            let _ = proxy.try_signal(Signo::SIG_NAME(conf.name));
            workers[n].replace(proxy);
        }

        let group = Boxed::new_in(
            &*pool,
            Self {
                sender: GroupSender::new(),
                queues,
                mpscs,
                workers,
                conf,
                pool: boxed_pool,
            },
        )
        .map(|boxed| boxed.into())?;

        Ok(group)
    }

    pub(crate) fn active(grp: BoxedGroup) -> Result<(), Error> {
        GROUPS.active(grp)
    }

    pub(crate) unsafe fn reactive(grp: BoxedGroup) -> Result<(), Error> {
        GROUPS.reactive(grp)
    }

    /// 如果启用了features = "task_mem_cache"，则强制cache回收
    pub fn clean_cache(id: u8) {
        if let Some(grp) = GROUPS.locked_get(id) {
            for mpsc in grp.mpscs.iter() {
                mpsc.send(Signo::SIG_CLEAR_CACHE);
            }
        }
    }
}

impl Group {
    /// 基于id获取运行时，如果运行时未启动，则会panic
    pub fn get(id: u8) -> GroupRef {
        return GROUPS.get(id).expect("Runtime {id} don't be inited");
    }

    pub(crate) fn spawn<T: Future>(&self, future: T, attr: &Attr) -> Result<TaskRef, Error> {
        let mut task = Task::new(future, attr)?;
        let queue = self.queues.as_ref();
        if attr.hash == 0 {
            self.sender.send(queue, task.clone());
        } else {
            let id = attr.hash % queue.len();
            task.status.set_local(id as u16);
            queue[id].push(task.clone());
        }
        Ok(task)
    }

    pub(crate) fn sched(&self, task: TaskRef, local: Option<u16>) {
        let queue = self.queues.as_ref();
        // 调用很频繁，rust无分支预测的语言特性
        #[allow(clippy::unnecessary_unwrap)]
        if local.is_none() {
            self.sender.send(queue, task);
        } else {
            queue[local.unwrap() as usize].push(task);
        }
    }

    pub(crate) fn task_cache(&self, worker: u16) -> &'static MemCache {
        self.worker(worker).cache
    }

    pub(crate) fn id(&self) -> u8 {
        self.conf.id
    }

    pub(crate) fn worker(&self, id: u16) -> &WorkerProxy {
        self.workers[id as usize].as_ref().unwrap()
    }
}

impl Group {
    fn stop(&mut self) {
        for worker in &mut *self.workers {
            if let Some(worker) = worker.as_mut() {
                worker.stop();
            }
        }
    }

    fn conf(conf: &Config) -> Config {
        Config {
            qlen: Self::get_qlen(conf.qlen),
            nth: Self::get_nth(conf.nth),
            ..conf.clone()
        }
    }

    fn get_qlen(qlen: usize) -> usize {
        if qlen > 0 {
            qlen
        } else {
            8
        }
    }

    fn get_nth(nth: usize) -> usize {
        if nth == 0 {
            thread::get_cpu_count()
        } else if nth >= ((u16::MAX as usize) >> 1) {
            u16::MAX as usize >> 1
        } else {
            nth
        }
    }
}

struct Channels {
    mpsc_send: Boxed<'static, [Option<MpscSender>], PoolAlloc>,
    mpsc_recv: Boxed<'static, [Option<MpscReceiver>], PoolAlloc>,
}

impl Channels {
    fn new(pool: Pool, nth: usize, qlen: usize) -> Result<Self, Error> {
        let mut mpsc_send =
            Boxed::init_slice_then_in::<Option<MpscSender>, _>(PoolAlloc, nth, |_, uninit| {
                uninit.write(None);
                Ok(())
            })?;
        let mut mpsc_recv =
            Boxed::init_slice_then_in::<Option<MpscReceiver>, _>(PoolAlloc, nth, |_, uninit| {
                uninit.write(None);
                Ok(())
            })?;
        for (send, recv) in mpsc_send.iter_mut().zip(mpsc_recv.iter_mut()) {
            let (mpsc_send, mpsc_recv) = mpsc::channel_in(pool, qlen)?;
            send.replace(mpsc_send);
            recv.replace(mpsc_recv);
        }
        Ok(Self {
            mpsc_send,
            mpsc_recv,
        })
    }

    fn mpsc_recv(&mut self, n: usize) -> MpscReceiver {
        self.mpsc_recv[n].take().unwrap()
    }

    fn mpsc_send(&mut self, n: usize) -> MpscSender {
        self.mpsc_send[n].as_ref().unwrap().clone()
    }
}
